-
Notifications
You must be signed in to change notification settings - Fork 9
Fix join backfills when a new partition column is set for the Query #783
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
pass leftSpec use partition column of left for scanDf Add integration test for the notds case scalafmt
WalkthroughNew event sources and group-by configurations using partitioned tables were added for purchases and training set joins in the canary test suite. Shell scripts were updated to handle the new datasets, and Scala code was refactored to clarify partition column usage and streamline log formatting. No public API signatures were changed. Changes
Sequence Diagram(s)sequenceDiagram
participant Script
participant BQ
participant Zipline
participant Purchases
participant TrainingSet
Script->>BQ: Delete old _notds tables
Script->>Zipline: Run backfill for purchases_v1_test_notds
Zipline->>Purchases: Aggregate purchases_notds by user_id
Script->>Zipline: Run backfill for training_set_v1_test_notds
Zipline->>TrainingSet: Join checkouts_notds with purchases_notds group-by
Suggested reviewers
Poem
Note ⚡️ AI Code Reviews for VS Code, Cursor, WindsurfCodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback. Note ⚡️ Faster reviews with cachingCodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
scripts/distribution/run_gcp_quickstart.sh (1)
164-164: Extra blank line.Minor formatting issue.
-
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro (Legacy)
📒 Files selected for processing (7)
api/python/test/canary/group_bys/gcp/purchases.py(1 hunks)api/python/test/canary/joins/gcp/training_set.py(2 hunks)scripts/distribution/run_gcp_quickstart.sh(2 hunks)spark/src/main/scala/ai/chronon/spark/JoinUtils.scala(4 hunks)spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala(1 hunks)spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala(1 hunks)spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala (1)
spark/src/main/scala/ai/chronon/spark/Extensions.scala (1)
pretty(40-52)
spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (2)
cloud_gcp/src/main/scala/ai/chronon/integrations/cloud_gcp/BigQueryNative.scala (2)
partitions(186-263)primaryPartitions(139-184)spark/src/main/scala/ai/chronon/spark/catalog/Format.scala (1)
primaryPartitions(49-72)
api/python/test/canary/joins/gcp/training_set.py (3)
api/src/main/scala/ai/chronon/api/Builders.scala (1)
Source(106-140)api/python/ai/chronon/source.py (1)
EventSource(8-35)api/python/ai/chronon/query.py (1)
selects(103-126)
⏰ Context from checks skipped due to timeout of 90000ms (29)
- GitHub Check: service_tests
- GitHub Check: streaming_tests
- GitHub Check: service_commons_tests
- GitHub Check: streaming_tests
- GitHub Check: service_tests
- GitHub Check: online_tests
- GitHub Check: spark_tests
- GitHub Check: online_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: flink_tests
- GitHub Check: flink_tests
- GitHub Check: analyzer_tests
- GitHub Check: batch_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: spark_tests
- GitHub Check: api_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: fetcher_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: groupby_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: api_tests
- GitHub Check: batch_tests
- GitHub Check: aggregator_tests
- GitHub Check: analyzer_tests
- GitHub Check: python_tests
- GitHub Check: aggregator_tests
🔇 Additional comments (19)
spark/src/test/scala/ai/chronon/spark/test/TableUtilsTest.scala (1)
24-24: Import reordering looks good.This is just a reordering of imports and doesn't affect functionality.
spark/src/main/scala/ai/chronon/spark/batch/StagingQuery.scala (1)
63-63: Cleaner log formatting.Simplified log formatting by using a concise lambda expression.
spark/src/main/scala/ai/chronon/spark/catalog/TableUtils.scala (2)
133-134: Good variable introduction for partition column resolution.Introducing
effectivePartColumnimproves clarity and ensures the correct partition column is used consistently.
139-142: Consistently using the resolved partition column.Using
effectivePartColumnin both logging and in theprimaryPartitionscall ensures consistent partition column handling.scripts/distribution/run_gcp_quickstart.sh (4)
75-76: Added cleanup for new _notds tables.Adding cleanup for tables with non-default partition columns in canary environment.
83-84: Added cleanup for new _notds tables in dev environment.Consistent with the changes for canary environment above.
156-157: Added backfill for _notds tables in canary environment.This supports the non-standard partition column tables introduced in the PR.
160-160: Added backfill for _notds tables in dev environment.Ensures consistent handling in both environments.
spark/src/main/scala/ai/chronon/spark/JoinUtils.scala (5)
86-86: Improved partition column handling.This change explicitly extracts the partition column from the left specification, enabling correct partition column usage.
90-90: Fixed partition column reference.Now properly using the partition column from the left specification instead of default partition column from tableUtils.
176-176: Corrected PartitionRange construction.Using leftSpec instead of tableUtils.partitionSpec ensures the partition range is built with the correct partition specification.
328-329: Improved logging readability.Consolidated logging into a cleaner single-line format with string concatenation.
496-496: Simplified code structure.Removed unnecessary braces and line breaks for better readability.
api/python/test/canary/joins/gcp/training_set.py (4)
1-1: Updated import to access all purchases module objects.Changed to import the entire module to reference the new notds group-by configurations.
26-26: Updated references with module qualification.Now correctly referencing GroupBy objects through the purchases module.
Also applies to: 33-33
37-48: Added new source with custom partition column.Created a new source_notds that uses "notds" as partition column to support non-date-string partitioning.
50-62: Added Join configurations for notds partitioned data.New Join objects that work with the custom partition column, completing the integration test coverage.
api/python/test/canary/group_bys/gcp/purchases.py (2)
138-148: Added source with custom partition column.Created a source_notds with partition_column="notds" to test non-date-string partitioning.
150-202: Added GroupBy configurations for notds partitioned data.Created test and dev versions of GroupBy configurations using the custom partition column source.
| if (!tableReachable(tableName)) return List.empty[String] | ||
| val rangeWheres = andPredicates(partitionRange.map(_.whereClauses).getOrElse(Seq.empty)) | ||
|
|
||
| val effectivePartColumn = tablePartitionSpec.map(_.column).getOrElse(partitionColumnName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we might want to prioritize the spec in the range, then the table spec, then the default spec?
|
|
||
| logger.info(s"Attempting to fill join partition range: $leftStart to $leftEnd") | ||
| PartitionRange(leftStart, leftEnd)(tableUtils.partitionSpec) | ||
| PartitionRange(leftStart, leftEnd)(leftSpec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we might want to default to the tableUtils.partitionSpec if there's no leftSpec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that auto magically happens
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Implicit boy"
pass leftSpec
use partition column of left for scanDf
Add integration test for the notds case
scalafmt
Summary
Duplicate of #780 without the nasty merge conflicts
Checklist
Summary by CodeRabbit
New Features
Chores
Style